1 /* 2 Copyright: Marcelo S. N. Mancini (Hipreme|MrcSnm), 2018 - 2021 3 License: [https://creativecommons.org/licenses/by/4.0/|CC BY-4.0 License]. 4 Authors: Marcelo S. N. Mancini 5 6 Copyright Marcelo S. N. Mancini 2018 - 2021. 7 Distributed under the CC BY-4.0 License. 8 (See accompanying file LICENSE.txt or copy at 9 https://creativecommons.org/licenses/by/4.0/ 10 */ 11 module hip.util.concurrency; 12 13 14 version(CustomRuntimeTest){} 15 else 16 { 17 version(Windows) version = HipConcurrency; 18 version(Android) version = HipConcurrency; 19 version(UWP) version = HipConcurrency; 20 version(linux) version = HipConcurrency; 21 } 22 23 24 25 version(HipConcurrency) 26 { 27 28 /** 29 * Test for wrapping atomic operations in a structure 30 */ 31 struct Atomic(T) 32 { 33 import core.atomic; 34 private T value; 35 36 auto opAssign(T)(T value) 37 { 38 atomicStore(this.value, value); 39 return value; 40 } 41 private @property T v(){return atomicLoad(value);} 42 alias v this; 43 44 } 45 46 ///Tries to implement a `volatile` java style 47 struct Volatile(T) 48 { 49 import core..volatile; 50 private T value; 51 52 auto synchronized opAssign(T)(T value) 53 { 54 volatileStore(&this.value, value); 55 return value; 56 } 57 private @property synchronized T v(){return volatileLoad(value);} 58 alias v this; 59 } 60 61 import core.thread; 62 import core.sync.mutex : Mutex; 63 import core.sync.semaphore:Semaphore; 64 65 class DebugMutex 66 { 67 private string lastFileLock; 68 private size_t lastLineLock; 69 private ThreadID lastID; 70 71 private string lastFileUnlock; 72 private size_t lastLineUnlock; 73 74 private Mutex mtx; 75 76 private ThreadID mainThreadId; 77 78 this(ThreadID mainId = ThreadID.init) 79 { 80 this.mainThreadId = mainId; 81 mtx = new Mutex(); 82 } 83 void lock(string file = __FILE__, size_t line = __LINE__) 84 { 85 import std.process:thisThreadID; 86 if(lastLineLock == 0) 87 { 88 lastLineUnlock = 0; 89 lastFileUnlock = null; 90 91 lastFileLock = file; 92 lastLineLock = line; 93 lastID = thisThreadID; 94 } 95 else 96 { 97 version(Desktop) 98 { 99 import std.stdio; 100 import hip.util.conv:to; 101 string last = (lastID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(lastID)~")"; 102 string curr = (thisThreadID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(thisThreadID)~")"; 103 104 writeln("Tried to lock a locked mutex at ", file, ":", line, 105 "\n\tLast locked at ", lastFileLock, ":",lastLineLock, " ", last, 106 " Current Thread is ",curr 107 ); 108 } 109 } 110 mtx.lock(); 111 } 112 void unlock(string file = __FILE__, size_t line = __LINE__) 113 { 114 version(Desktop) 115 { 116 import std.process:thisThreadID; 117 if(lastLineLock == 0) 118 { 119 import std.stdio; 120 import hip.util.conv:to; 121 string last = (lastID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(lastID)~")"; 122 string curr = (thisThreadID == mainThreadId ? "Main " : "") ~ "Thread("~to!string(thisThreadID)~")"; 123 124 writeln( 125 "Tried to unlock an unlocked mutex at ", file, ":", line, 126 "\n\tLast unlocked at ", lastFileUnlock, ":",lastLineUnlock, " ", last, 127 " Current Thread is ",curr 128 ); 129 // throw new Error("Tried to unlock an unlocked mutex"); 130 } 131 } 132 lastLineUnlock = line; 133 lastFileUnlock = file; 134 lastFileLock = null; 135 lastLineLock = 0; 136 mtx.unlock(); 137 } 138 139 } 140 141 class HipWorkerThread : Thread 142 { 143 private struct WorkerJob 144 { 145 string name; 146 void delegate() task; 147 void delegate(string taskName) onTaskFinish; 148 } 149 private WorkerJob[] jobsQueue; 150 private Semaphore semaphore; 151 private bool isAlive; 152 private DebugMutex mutex; 153 private HipWorkerPool pool; 154 private ThreadID mainThreadID; 155 156 157 this(HipWorkerPool pool = null, ThreadID mainThreadID = ThreadID.init) 158 { 159 super(&run); 160 if(pool) 161 this.pool = pool; 162 isAlive = true; 163 semaphore = new Semaphore; 164 this.mainThreadID = mainThreadID; 165 mutex = new DebugMutex(mainThreadID); 166 } 167 /** 168 * This thread goes into an invalid state after finishing it. It should not be used anymore 169 */ 170 void finish() 171 { 172 mutex.lock(); 173 isAlive = false; 174 semaphore.notify; 175 mutex.unlock(); 176 } 177 bool isIdle() 178 { 179 mutex.lock(); 180 bool ret = isIdleImpl(); 181 mutex.unlock(); 182 return ret; 183 } 184 private bool isIdleImpl() 185 { 186 return jobsQueue.length == 0; 187 } 188 /** 189 * Synchronized push on queue 190 */ 191 void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null) 192 { 193 if(isAlive) 194 { 195 mutex.lock(); 196 jobsQueue~= WorkerJob(name, task, onTaskFinish); 197 mutex.unlock(); 198 semaphore.notify(); 199 } 200 else 201 { 202 import std.stdio; 203 writeln("Thread is not alive to get tasks."); 204 } 205 } 206 207 void startWorking() 208 { 209 if(!isRunning) 210 start(); 211 } 212 void await(bool rethrow = true) 213 { 214 // pushTask("await", () => finish); 215 // join(rethrow); 216 } 217 218 void run() 219 { 220 while(isAlive) 221 { 222 mutex.lock(); 223 if(!isIdleImpl) 224 { 225 WorkerJob job = jobsQueue[0]; 226 mutex.unlock(); 227 import std.stdio; 228 try 229 { 230 mutex.lock(); 231 job.task(); 232 if(job.onTaskFinish != null) 233 { 234 job.onTaskFinish(job.name); 235 } 236 mutex.unlock(); 237 } 238 catch(Error e) 239 { 240 onAnyException(true, e.toString()); 241 return; 242 } 243 catch(Exception e) 244 { 245 onAnyException(false, e.toString()); 246 return; 247 } 248 mutex.lock(); 249 jobsQueue = jobsQueue[1..$]; 250 mutex.unlock(); 251 } 252 else 253 mutex.unlock(); 254 semaphore.wait; 255 } 256 } 257 258 private void onAnyException(bool isError, string message) 259 { 260 import std.stdio; 261 isAlive = false; 262 if(pool) 263 pool.onHipThreadError(this, isError,message); 264 } 265 void dispose() 266 { 267 finish(); 268 destroy(semaphore); 269 destroy(mutex); 270 } 271 } 272 273 274 class HipWorkerPool 275 { 276 HipWorkerThread[] threads; 277 protected Semaphore awaitSemaphore; 278 protected void delegate()[] finishHandlersOnMainThread; 279 protected void delegate()[] onAllTasksFinishHandlers; 280 protected DebugMutex handlersMutex; 281 282 private struct Task 283 { 284 string name; 285 void delegate() task; 286 void delegate(string taskName) onTaskFinish = null; 287 } 288 private Task[] mainThreadTasks; 289 private uint awaitCount = 0; 290 private size_t tasksCount; 291 292 293 this(size_t poolSize) 294 { 295 threads = new HipWorkerThread[](poolSize); 296 import std.process:thisThreadID; 297 auto mainId = thisThreadID; 298 handlersMutex = new DebugMutex(mainId); 299 for(size_t i = 0; i < poolSize; i++) 300 threads[i] = new HipWorkerThread(this, mainId); 301 awaitSemaphore = new Semaphore(0); 302 } 303 304 void addOnAllTasksFinished(void delegate() onAllFinished) 305 { 306 if(tasksCount == 0) 307 onAllFinished(); 308 else 309 onAllTasksFinishHandlers~= onAllFinished; 310 } 311 312 protected void onHipThreadError(HipWorkerThread worker, bool isError, string message) 313 { 314 if(awaitCount > 0) 315 { 316 awaitSemaphore.notify(); 317 } 318 import hip.util.array; 319 import std.stdio; 320 writeln("Worker ", worker.jobsQueue[0].name, " failed with ", isError ? "error" : "exception", ":", message); 321 threads.remove(worker); 322 } 323 void await() 324 { 325 awaitCount = 0; 326 foreach(thread; threads) 327 { 328 if(!thread.isIdle) 329 { 330 thread.pushTask("Await", () 331 { 332 awaitSemaphore.notify; 333 }); 334 awaitCount++; 335 } 336 } 337 startWorking(); 338 while(awaitCount > 0) 339 { 340 awaitSemaphore.wait(); 341 awaitCount--; 342 } 343 } 344 /** 345 * If there is no idle thread, null will be returned and the task and onFinish callbacks will be executed on that same thread. 346 * - Keep in mind that pushin task is not enough. You need to call startWorking() to make it active after pushing tasks 347 */ 348 HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = false) 349 { 350 handlersMutex.lock(); 351 tasksCount++; 352 handlersMutex.unlock(); 353 foreach(thread; threads) 354 { 355 if(thread.isIdle) 356 { 357 if(onTaskFinish !is null && isOnFinishOnMainThread) 358 thread.pushTask(name, task, notifyOnFinishOnMainThread(onTaskFinish)); 359 else 360 thread.pushTask(name, task, notifyOnFinish(onTaskFinish)); 361 return thread; 362 } 363 } 364 //Execute a main thread task if it had anything. 365 handlersMutex.lock(); 366 mainThreadTasks~= Task(name, task, notifyOnFinish(onTaskFinish)); 367 handlersMutex.unlock(); 368 return null; 369 } 370 371 protected void executeMainThreadTasks() 372 { 373 handlersMutex.lock(); 374 if(mainThreadTasks.length != 0) 375 { 376 foreach(mainThreadTask; mainThreadTasks) 377 { 378 mainThreadTask.task(); 379 if(mainThreadTask.onTaskFinish != null) 380 mainThreadTask.onTaskFinish(mainThreadTask.name); 381 } 382 mainThreadTasks.length = 0; 383 } 384 handlersMutex.unlock(); 385 } 386 387 /** 388 * This function should be called every time you push a task. 389 */ 390 void startWorking() 391 { 392 foreach(thread; threads) 393 if(!thread.isIdle) 394 thread.startWorking(); 395 executeMainThreadTasks(); 396 } 397 398 void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish = null) 399 { 400 return (name) 401 { 402 handlersMutex.lock(); 403 if(onFinish) 404 onFinish(name); 405 tasksCount--; 406 handlersMutex.unlock(); 407 }; 408 } 409 410 void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true) 411 { 412 return (name) 413 { 414 handlersMutex.lock(); 415 finishHandlersOnMainThread~= () 416 { 417 onFinish(name); 418 if(finished) 419 tasksCount--; 420 }; 421 handlersMutex.unlock(); 422 }; 423 } 424 425 bool isIdle() 426 { 427 foreach(thread; threads) 428 if(!thread.isIdle) 429 return false; 430 return true; 431 } 432 433 void pollFinished() 434 { 435 handlersMutex.lock(); 436 if(finishHandlersOnMainThread.length) 437 { 438 foreach(finishHandler; finishHandlersOnMainThread) 439 finishHandler(); 440 finishHandlersOnMainThread.length = 0; 441 } 442 if(tasksCount == 0 && onAllTasksFinishHandlers.length) 443 { 444 foreach(onAllFinish; onAllTasksFinishHandlers) 445 onAllFinish(); 446 onAllTasksFinishHandlers.length = 0; 447 } 448 handlersMutex.unlock(); 449 450 } 451 452 void dispose() 453 { 454 foreach(thread; threads) 455 thread.dispose(); 456 destroy(threads); 457 destroy(awaitSemaphore); 458 destroy(handlersMutex); 459 } 460 } 461 462 } 463 else 464 { 465 class DebugMutex 466 { 467 this(ulong id = 0){} 468 final void lock(){} 469 final void unlock(){} 470 } 471 class HipWorkerPool 472 { 473 private HipWorkerThread thread; 474 protected void delegate()[] onAllTasksFinishHandlers; 475 private void delegate()[] finishHandlersOnMainThread; 476 size_t tasksCount = 0; 477 void addOnAllTasksFinished(void delegate() onAllFinished) 478 { 479 if(tasksCount == 0) 480 onAllFinished(); 481 else 482 onAllTasksFinishHandlers~= onAllFinished; 483 } 484 485 this(size_t poolSize) 486 { 487 thread = new HipWorkerThread(this, ulong.max); 488 } 489 void delegate(string name) notifyOnFinishOnMainThread(void delegate(string taskName) onFinish, bool finished = true) 490 { 491 return (name) 492 { 493 finishHandlersOnMainThread~= () 494 { 495 onFinish(name); 496 if(finished) 497 tasksCount--; 498 }; 499 }; 500 } 501 502 void delegate(string name) notifyOnFinish(void delegate(string taskName) onFinish) 503 { 504 return (name) 505 { 506 if(onFinish) onFinish(name); 507 version(WebAssembly){} 508 else 509 tasksCount--; 510 }; 511 } 512 final void signalTaskFinish() 513 { 514 assert(tasksCount > 0, "Tried to signal task finish without tasks."); 515 tasksCount--; 516 } 517 final void await() 518 { 519 version(WebAssembly) assert(false, "Code using await does not work on WebAssembly."); 520 } 521 final void pollFinished() 522 { 523 if(finishHandlersOnMainThread.length) 524 { 525 foreach(handler; finishHandlersOnMainThread) 526 handler(); 527 finishHandlersOnMainThread.length = 0; 528 } 529 if(tasksCount == 0 && onAllTasksFinishHandlers.length) 530 { 531 foreach(onAllFinish; onAllTasksFinishHandlers) 532 onAllFinish(); 533 onAllTasksFinishHandlers.length = 0; 534 } 535 } 536 final HipWorkerThread pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null, bool isOnFinishOnMainThread = true) 537 { 538 tasksCount++; 539 version(WebAssembly) 540 assert(onTaskFinish is null, "Can't have an onTaskFinish on Wasm, implement it on a higher level using notfyOnFinish."); 541 thread.pushTask(name, task, notifyOnFinish(onTaskFinish)); 542 return thread; 543 } 544 final void startWorking(){thread.startWorking();} 545 final void finish(){} 546 final bool isIdle(){return thread.isIdle;} 547 final void dispose(){} 548 } 549 class HipWorkerThread 550 { 551 struct WorkerTask 552 { 553 void delegate() task; 554 void delegate(string taskName) onTaskFinish; 555 string name; 556 } 557 WorkerTask[] tasks; 558 559 this(HipWorkerPool pool, ulong id){} 560 final void pushTask(string name, void delegate() task, void delegate(string taskName) onTaskFinish = null) 561 { 562 tasks~= WorkerTask(task, onTaskFinish, name); 563 } 564 565 final void startWorking() 566 { 567 foreach(task; tasks) 568 { 569 task.task(); 570 if(task.onTaskFinish) 571 task.onTaskFinish(task.name); 572 } 573 tasks.length = 0; 574 } 575 576 bool isIdle(){return tasks.length == 0;} 577 } 578 }